package org.databandtech.flink.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 先运行 nc -lk 9999 ,如果是windows，nc -l -p 9999
 * 然后，输入,分割的的单词
 * @author Administrator
 *
 */
public class NetcatFlatMap {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		try {
			netSource(env);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	private static void netSource(StreamExecutionEnvironment env) throws Exception {
		
		DataStreamSource<String> source = env
				.socketTextStream("localhost", 9999);
		source.print();
		
		DataStream<Tuple2<String, Integer>> dataStream = 			
				source.flatMap(new Splitter())
				.keyBy(value -> value.f0)
				.timeWindow(Time.seconds(5))
				.sum(1);
		dataStream.print();
		env.execute("WordCount1");
	}
	
	public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
		private static final long serialVersionUID = -5040122005470861333L;

		@Override
		public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
			for (String word : sentence.split(",")) {
				out.collect(new Tuple2<String, Integer>(word, 1));
			}
		}
	}

}
